Movement around Corners#

In the following we’ll investigate the movement of pedestrians around corners. When pedestrians walk around corners they are expected to slow down and take a path that is close to the corner. According to RiMEA Test 6 [TODO REF] a scenario is configured where 20 agents move towards a corner at which they should turn to the left.

Let’s begin by importing the required packages for our simulation:

Hide code cell source
from shapely import GeometryCollection, Polygon, to_wkt
import pathlib
import jupedsim as jps
import pedpy
import pandas as pd
import numpy as np
import plotly  # visualise trajectories
import plotly.express as px
import plotly.graph_objects as go
from plotly.graph_objs import Figure
import sqlite3
from numpy.random import normal  # normal distribution of free movement speed

Setting up the Geometry#

We define a corridor with a width of 2 meters and a corner on halfway:

Hide code cell source
area = Polygon([(0, 0), (12, 0), (12, 12), (10, 12), (10, 2), (0, 2)])
area
../_images/ace61c9a58a2afb3f428a23c07d552bfdaabb835e5d50ced10f1a7d4ebc16f63.svg

Definition of Start Positions and Exit#

Now we’ll calculate the position of 20 agents in the lower left part of the geometry within an rectangle of 6 x 2 meters. To calculate the positions we use a library function from JuPedSim. We assume an agent size of 0.3 m and set the distance parameters accordingly. The exit is defined in the upper right of the geometry.

spawning_area = Polygon([(0, 0), (6, 0), (6, 2), (0, 2)])
num_agents = 20
positions = jps.distributions.distribute_by_number(
    polygon=spawning_area,
    number_of_agents=num_agents,
    distance_to_agents=0.4,
    distance_to_polygon=0.15,
    seed=1,
)
exit_area = Polygon([(10, 11), (12, 11), (12, 12), (10, 12)])

TODO: plot config setup - geo + start positions + exit

Setting up the Simulation and Routing Details#

As a next step we create a simulation object, set the configuration for the operational model (collision-free speed model) and define the routes for the agents. For this scenario only one journey is created as all agents should follow the same route.

trajectory_file = "corner.sqlite"  # output file
simulation = jps.Simulation(
    model=jps.CollisionFreeSpeedModel(),
    geometry=area,
    trajectory_writer=jps.SqliteTrajectoryWriter(
        output_file=pathlib.Path(trajectory_file)
    ),
)
exit_id = simulation.add_exit_stage(exit_area.exterior.coords[:-1])
journey = jps.JourneyDescription([exit_id])
journey_id = simulation.add_journey(journey)

Specifying Agent Parameters#

As a next step we define the model-specific parameters for the agents. They share the same journey and model parameters except for the free movement speed which is normally distributed.

v_distribution = normal(1.34, 0.2, num_agents)

Executing the Simulation#

Now we can specifiy the indiviual starting positions and speeds and add the agents to the simulation. After that the simulation is started and iterates until all agents have reached the exit.

for position, v0 in zip(positions, v_distribution):
    simulation.add_agent(
        jps.CollisionFreeSpeedModelAgentParameters(
            journey_id=journey_id, stage_id=exit_id, position=position, v0=v0
        )
    )

while simulation.agent_count() > 0:
    simulation.iterate()

Visualizing the Trajectories#

Hide code cell source
def read_sqlite_file(
    trajectory_file: str,
) -> (pedpy.TrajectoryData, pedpy.WalkableArea):
    with sqlite3.connect(trajectory_file) as con:
        data = pd.read_sql_query(
            "select frame, id, pos_x as x, pos_y as y, ori_x as ox, ori_y as oy from trajectory_data",
            con,
        )
        fps = float(
            con.cursor()
            .execute("select value from metadata where key = 'fps'")
            .fetchone()[0]
        )
        walkable_area = (
            con.cursor().execute("select wkt from geometry").fetchone()[0]
        )
        return (
            pedpy.TrajectoryData(data=data, frame_rate=fps),
            pedpy.WalkableArea(walkable_area),
        )


def speed_to_color(speed, min_speed, max_speed, midpoint):
    colorscale = px.colors.diverging.RdBu_r[::-1]

    # Normalize speed based on the midpoint
    if speed >= midpoint:
        normalized_speed = 0.5 + 0.5 * (speed - midpoint) / (
            max_speed - midpoint
        )
    else:
        normalized_speed = 0.5 * (speed - min_speed) / (midpoint - min_speed)

    # Clip to ensure the value is between 0 and 1
    normalized_speed = np.clip(normalized_speed, 0, 1)

    # Find the corresponding color in the colorscale
    color_idx = int(normalized_speed * (len(colorscale) - 1))
    return colorscale[color_idx]


def get_geometry_traces(area):
    geometry_traces = []
    x, y = area.exterior.xy
    geometry_traces.append(
        go.Scatter(
            x=np.array(x),
            y=np.array(y),
            mode="lines",
            line={"color": "grey"},
            showlegend=False,
            name="Exterior",
            hoverinfo="name",
        )
    )
    for inner in area.interiors:
        xi, yi = zip(*inner.coords[:])
        geometry_traces.append(
            go.Scatter(
                x=np.array(xi),
                y=np.array(yi),
                mode="lines",
                line={"color": "grey"},
                showlegend=False,
                name="Obstacle",
                hoverinfo="name",
            )
        )
    return geometry_traces


def get_shapes_for_frame(frame_data, min_speed, max_speed, midpoint):
    def create_shape(row):
        hover_trace = go.Scatter(
            x=[row["x"]],
            y=[row["y"]],
            text=[f"ID: {row['id']}, Pos({row['x']:.2f},{row['y']:.2f})"],
            mode="markers",
            marker=dict(size=1, opacity=1),
            hoverinfo="text",
            showlegend=False,
        )
        if row["speed"] == -1000:  # Check for dummy speed
            return (
                go.layout.Shape(
                    type="circle",
                    xref="x",
                    yref="y",
                    x0=row["x"] - row["radius"],
                    y0=row["y"] - row["radius"],
                    x1=row["x"] + row["radius"],
                    y1=row["y"] + row["radius"],
                    line=dict(width=0),
                    fillcolor="rgba(255,255,255,0)",  # Transparent fill
                ),
                hover_trace,
            )
        color = speed_to_color(row["speed"], min_speed, max_speed, midpoint)
        return (
            go.layout.Shape(
                type="circle",
                xref="x",
                yref="y",
                x0=row["x"] - row["radius"],
                y0=row["y"] - row["radius"],
                x1=row["x"] + row["radius"],
                y1=row["y"] + row["radius"],
                line_color=color,
                fillcolor=color,
            ),
            hover_trace,
        )

    results = frame_data.apply(create_shape, axis=1).tolist()
    shapes = [res[0] for res in results]
    hover_traces = [res[1] for res in results]
    return shapes, hover_traces


def create_fig(
    initial_agent_count,
    initial_shapes,
    initial_hover_trace,
    geometry_traces,
    hover_traces,
    frames,
    steps,
    area_bounds,
    width=800,
    height=800,
):
    minx, miny, maxx, maxy = area_bounds
    fig = go.Figure(
        data=geometry_traces + hover_traces + initial_hover_trace,
        frames=frames,
        layout=go.Layout(
            shapes=initial_shapes,
            title=f"<b>Number of Agents: {initial_agent_count}</b>",
            title_x=0.5,
        ),
    )
    fig.update_layout(
        updatemenus=[
            {
                "buttons": [
                    {
                        "args": [
                            None,
                            {
                                "frame": {"duration": 100, "redraw": True},
                                "fromcurrent": True,
                            },
                        ],
                        "label": "Play",
                        "method": "animate",
                    }
                ],
                "direction": "left",
                "pad": {"r": 10, "t": 87},
                "showactive": False,
                "type": "buttons",
                "x": 0.1,
                "xanchor": "right",
                "y": 0,
                "yanchor": "top",
            }
        ],
        sliders=[
            {
                "active": 0,
                "yanchor": "top",
                "xanchor": "left",
                "currentvalue": {
                    "font": {"size": 20},
                    "prefix": "Frame:",
                    "visible": True,
                    "xanchor": "right",
                },
                "transition": {"duration": 100, "easing": "cubic-in-out"},
                "pad": {"b": 10, "t": 50},
                "len": 0.9,
                "x": 0.1,
                "y": 0,
                "steps": steps,
            }
        ],
        autosize=False,
        width=width,
        height=height,
        xaxis=dict(range=[minx - 0.5, maxx + 0.5]),
        yaxis=dict(
            scaleanchor="x", scaleratio=1, range=[miny - 0.5, maxy + 0.5]
        ),
    )
    return fig


def animate(
    data: pedpy.TrajectoryData, area: pedpy.WalkableArea, *, every_nth_frame=5
):
    data_df = pedpy.compute_individual_speed(traj_data=data, frame_step=5)
    data_df = data_df.merge(data.data, on=["id", "frame"], how="left")
    data_df["radius"] = 0.2
    min_speed = data_df["speed"].min()
    max_speed = data_df["speed"].max()
    midpoint = np.mean(data_df["speed"])
    max_agents = data_df.groupby("frame").size().max()
    dummy_agent_data = {"x": 0, "y": 0, "radius": 0, "speed": -1000}
    frames = []
    steps = []
    unique_frames = data_df["frame"].unique()
    selected_frames = unique_frames[::every_nth_frame]
    geometry_traces = get_geometry_traces(area.polygon)
    initial_frame_data = data_df[data_df["frame"] == data_df["frame"].min()]
    initial_agent_count = len(initial_frame_data)
    initial_shapes, initial_hovers = get_shapes_for_frame(
        initial_frame_data, min_speed, max_speed, midpoint
    )
    for frame_num in selected_frames[1:]:
        frame_data = data_df[data_df["frame"] == frame_num]
        agent_count = len(frame_data)
        while len(frame_data) < max_agents:
            dummy_df = pd.DataFrame([dummy_agent_data])
            frame_data = pd.concat([frame_data, dummy_df], ignore_index=True)

        shapes, hover_traces = get_shapes_for_frame(
            frame_data, min_speed, max_speed, midpoint
        )
        frame = go.Frame(
            data=geometry_traces + hover_traces,
            name=str(frame_num),
            layout=go.Layout(
                shapes=shapes,
                title=f"<b>Number of Agents: {agent_count}</b>",
                title_x=0.5,
            ),
        )
        frames.append(frame)
        step = {
            "args": [
                [str(frame_num)],
                {
                    "frame": {"duration": 100, "redraw": True},
                    "mode": "immediate",
                    "transition": {"duration": 500},
                },
            ],
            "label": str(frame_num),
            "method": "animate",
        }
        steps.append(step)

    return create_fig(
        initial_agent_count,
        initial_shapes,
        initial_hovers,
        geometry_traces,
        hover_traces,
        frames,
        steps,
        area.bounds,
        width=800,
        height=800,
    )
trajectory_data, walkable_area = read_sqlite_file(trajectory_file)
animate(trajectory_data, walkable_area)

TODO add colorbar for speed to plot

As expected the agents choose the shortest path and approach the corner in a funnel-shaped formation. Agents moving closer to the corner become slower than agents at the edge of the crowd who choose a longer path around the corner.

References & Further Exploration#

TODO RiMEA reference

The chosen model here is based on the collision-free speed model. JuPedSim also incorporates another model known as GCFM. For more details on GCFM, refer to another notebook (TODO: Link to the GCFM notebook).

The demonstration employed a straightforward journey with a singular exit. For a more intricate journey featuring multiple intermediate stops and waiting zones, see the upcoming section (TODO: Link to the advanced journey section).